{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Consulting Project \n",
    "## Recommender Systems - Solutions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The whole world seems to be hearing about your new amazing abilities to analyze big data and build useful systems for them! You've just taken up a new contract with a new online food delivery company. This company is trying to differentiate itself by recommending new meals to customers based off of other customers likings.\n",
    "\n",
    "Can you build them a recommendation system?\n",
    "\n",
    "Your final result should be in the form of a function that can take in a Spark DataFrame of a single customer's ratings for various meals and output their top 3 suggested meals. For example:\n",
    "\n",
    "Best of luck!\n",
    "\n",
    "** *Note from Jose: I completely made up this food data, so its likely that the actual recommendations themselves won't make any sense. But you should get a similar output to what I did given the example customer dataframe* **"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df = pd.read_csv('movielens_ratings.csv')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>count</th>\n",
       "      <th>mean</th>\n",
       "      <th>std</th>\n",
       "      <th>min</th>\n",
       "      <th>25%</th>\n",
       "      <th>50%</th>\n",
       "      <th>75%</th>\n",
       "      <th>max</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>movieId</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>49.405730</td>\n",
       "      <td>28.937034</td>\n",
       "      <td>0.0</td>\n",
       "      <td>24.0</td>\n",
       "      <td>50.0</td>\n",
       "      <td>74.0</td>\n",
       "      <td>99.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>rating</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>1.774151</td>\n",
       "      <td>1.187276</td>\n",
       "      <td>1.0</td>\n",
       "      <td>1.0</td>\n",
       "      <td>1.0</td>\n",
       "      <td>2.0</td>\n",
       "      <td>5.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>userId</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>14.383744</td>\n",
       "      <td>8.591040</td>\n",
       "      <td>0.0</td>\n",
       "      <td>7.0</td>\n",
       "      <td>14.0</td>\n",
       "      <td>22.0</td>\n",
       "      <td>29.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "          count       mean        std  min   25%   50%   75%   max\n",
       "movieId  1501.0  49.405730  28.937034  0.0  24.0  50.0  74.0  99.0\n",
       "rating   1501.0   1.774151   1.187276  1.0   1.0   1.0   2.0   5.0\n",
       "userId   1501.0  14.383744   8.591040  0.0   7.0  14.0  22.0  29.0"
      ]
     },
     "execution_count": 33,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.describe().transpose()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>movieId</th>\n",
       "      <th>rating</th>\n",
       "      <th>userId</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>movieId</th>\n",
       "      <td>1.000000</td>\n",
       "      <td>0.036569</td>\n",
       "      <td>0.003267</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>rating</th>\n",
       "      <td>0.036569</td>\n",
       "      <td>1.000000</td>\n",
       "      <td>0.056411</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>userId</th>\n",
       "      <td>0.003267</td>\n",
       "      <td>0.056411</td>\n",
       "      <td>1.000000</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "          movieId    rating    userId\n",
       "movieId  1.000000  0.036569  0.003267\n",
       "rating   0.036569  1.000000  0.056411\n",
       "userId   0.003267  0.056411  1.000000"
      ]
     },
     "execution_count": 34,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.corr()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "df['mealskew'] = df['movieId'].apply(lambda id: np.nan if id > 31 else id)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "/Users/marci/anaconda/lib/python3.5/site-packages/numpy/lib/function_base.py:3834: RuntimeWarning: Invalid value encountered in percentile\n",
      "  RuntimeWarning)\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<div>\n",
       "<table border=\"1\" class=\"dataframe\">\n",
       "  <thead>\n",
       "    <tr style=\"text-align: right;\">\n",
       "      <th></th>\n",
       "      <th>count</th>\n",
       "      <th>mean</th>\n",
       "      <th>std</th>\n",
       "      <th>min</th>\n",
       "      <th>25%</th>\n",
       "      <th>50%</th>\n",
       "      <th>75%</th>\n",
       "      <th>max</th>\n",
       "    </tr>\n",
       "  </thead>\n",
       "  <tbody>\n",
       "    <tr>\n",
       "      <th>movieId</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>49.405730</td>\n",
       "      <td>28.937034</td>\n",
       "      <td>0.0</td>\n",
       "      <td>24.0</td>\n",
       "      <td>50.0</td>\n",
       "      <td>74.0</td>\n",
       "      <td>99.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>rating</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>1.774151</td>\n",
       "      <td>1.187276</td>\n",
       "      <td>1.0</td>\n",
       "      <td>1.0</td>\n",
       "      <td>1.0</td>\n",
       "      <td>2.0</td>\n",
       "      <td>5.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>userId</th>\n",
       "      <td>1501.0</td>\n",
       "      <td>14.383744</td>\n",
       "      <td>8.591040</td>\n",
       "      <td>0.0</td>\n",
       "      <td>7.0</td>\n",
       "      <td>14.0</td>\n",
       "      <td>22.0</td>\n",
       "      <td>29.0</td>\n",
       "    </tr>\n",
       "    <tr>\n",
       "      <th>mealskew</th>\n",
       "      <td>486.0</td>\n",
       "      <td>15.502058</td>\n",
       "      <td>9.250634</td>\n",
       "      <td>0.0</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>NaN</td>\n",
       "      <td>31.0</td>\n",
       "    </tr>\n",
       "  </tbody>\n",
       "</table>\n",
       "</div>"
      ],
      "text/plain": [
       "           count       mean        std  min   25%   50%   75%   max\n",
       "movieId   1501.0  49.405730  28.937034  0.0  24.0  50.0  74.0  99.0\n",
       "rating    1501.0   1.774151   1.187276  1.0   1.0   1.0   2.0   5.0\n",
       "userId    1501.0  14.383744   8.591040  0.0   7.0  14.0  22.0  29.0\n",
       "mealskew   486.0  15.502058   9.250634  0.0   NaN   NaN   NaN  31.0"
      ]
     },
     "execution_count": 11,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "df.describe().transpose()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "mealmap = { 2. : \"Chicken Curry\",   \n",
    "           3. : \"Spicy Chicken Nuggest\",   \n",
    "           5. : \"Hamburger\",   \n",
    "           9. : \"Taco Surprise\",  \n",
    "           11. : \"Meatloaf\",  \n",
    "           12. : \"Ceaser Salad\",  \n",
    "           15. : \"BBQ Ribs\",  \n",
    "           17. : \"Sushi Plate\",  \n",
    "           19. : \"Cheesesteak Sandwhich\",  \n",
    "           21. : \"Lasagna\",  \n",
    "           23. : \"Orange Chicken\",\n",
    "           26. : \"Spicy Beef Plate\",  \n",
    "           27. : \"Salmon with Mashed Potatoes\",  \n",
    "           28. : \"Penne Tomatoe Pasta\",  \n",
    "           29. : \"Pork Sliders\",  \n",
    "           30. : \"Vietnamese Sandwich\",  \n",
    "           31. : \"Chicken Wrap\",  \n",
    "           np.nan: \"Cowboy Burger\",   \n",
    "           4. : \"Pretzels and Cheese Plate\",   \n",
    "           6. : \"Spicy Pork Sliders\",  \n",
    "           13. : \"Mandarin Chicken PLate\",  \n",
    "           14. : \"Kung Pao Chicken\",\n",
    "           16. : \"Fried Rice Plate\",  \n",
    "           8. : \"Chicken Chow Mein\",  \n",
    "           10. : \"Roasted Eggplant \",  \n",
    "           18. : \"Pepperoni Pizza\",  \n",
    "           22. : \"Pulled Pork Plate\",   \n",
    "           0. : \"Cheese Pizza\",   \n",
    "           1. : \"Burrito\",   \n",
    "           7. : \"Nachos\",  \n",
    "           24. : \"Chili\",  \n",
    "           20. : \"Southwest Salad\",  \n",
    "           25.: \"Roast Beef Sandwich\"}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "df['meal_name'] = df['mealskew'].map(mealmap)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 44,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "df.to_csv('Meal_Info.csv',index=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 42,
   "metadata": {
    "collapsed": false
   },
   "outputs": [],
   "source": [
    "spark = SparkSession.builder.appName('recconsulting').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "from pyspark.ml.evaluation import RegressionEvaluator\n",
    "from pyspark.ml.recommendation import ALS"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "data = spark.read.csv('Meal_Info.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "(training, test) = data.randomSplit([0.8, 0.2])"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 48,
   "metadata": {
    "collapsed": false
   },
   "outputs": [
    {
     "ename": "Py4JJavaError",
     "evalue": "An error occurred while calling o49.fit.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.NullPointerException: Value at index 1 is null\n\tat org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:466)\n\tat org.apache.spark.sql.Row$class.getInt(Row.scala:217)\n\tat org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:192)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:458)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:457)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:409)\n\tat scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)\n\tat scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat org.apache.spark.rdd.RDD.count(RDD.scala:1134)\n\tat org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:694)\n\tat org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:464)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException: Value at index 1 is null\n\tat org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:466)\n\tat org.apache.spark.sql.Row$class.getInt(Row.scala:217)\n\tat org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:192)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:458)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:457)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:409)\n\tat scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)\n\tat scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n",
     "output_type": "error",
     "traceback": [
      "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
      "\u001b[0;31mPy4JJavaError\u001b[0m                             Traceback (most recent call last)",
      "\u001b[0;32m<ipython-input-48-e2e37f46bdc6>\u001b[0m in \u001b[0;36m<module>\u001b[0;34m()\u001b[0m\n\u001b[1;32m      1\u001b[0m \u001b[0;31m# Build the recommendation model using ALS on the training data\u001b[0m\u001b[0;34m\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m      2\u001b[0m \u001b[0mals\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mALS\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mmaxIter\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m5\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mregParam\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;36m0.01\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0muserCol\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m\"userId\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mitemCol\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m\"mealskew\"\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mratingCol\u001b[0m\u001b[0;34m=\u001b[0m\u001b[0;34m\"rating\"\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m----> 3\u001b[0;31m \u001b[0mmodel\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mals\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mtraining\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m",
      "\u001b[0;32m/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/ml/base.py\u001b[0m in \u001b[0;36mfit\u001b[0;34m(self, dataset, params)\u001b[0m\n\u001b[1;32m     62\u001b[0m                 \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mcopy\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mparams\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     63\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 64\u001b[0;31m                 \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     65\u001b[0m         \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     66\u001b[0m             raise ValueError(\"Params must be either a param map or a list/tuple of param maps, \"\n",
      "\u001b[0;32m/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/ml/wrapper.py\u001b[0m in \u001b[0;36m_fit\u001b[0;34m(self, dataset)\u001b[0m\n\u001b[1;32m    211\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    212\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0m_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdataset\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 213\u001b[0;31m         \u001b[0mjava_model\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_fit_java\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    214\u001b[0m         \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_create_model\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mjava_model\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    215\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/ml/wrapper.py\u001b[0m in \u001b[0;36m_fit_java\u001b[0;34m(self, dataset)\u001b[0m\n\u001b[1;32m    208\u001b[0m         \"\"\"\n\u001b[1;32m    209\u001b[0m         \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_transfer_params_to_java\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 210\u001b[0;31m         \u001b[0;32mreturn\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_java_obj\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mfit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mdataset\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0m_jdf\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m    211\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    212\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0m_fit\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mself\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0mdataset\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/marci/anaconda/lib/python3.5/site-packages/py4j/java_gateway.py\u001b[0m in \u001b[0;36m__call__\u001b[0;34m(self, *args)\u001b[0m\n\u001b[1;32m   1131\u001b[0m         \u001b[0manswer\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0mself\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mgateway_client\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0msend_command\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0mcommand\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1132\u001b[0m         return_value = get_return_value(\n\u001b[0;32m-> 1133\u001b[0;31m             answer, self.gateway_client, self.target_id, self.name)\n\u001b[0m\u001b[1;32m   1134\u001b[0m \u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m   1135\u001b[0m         \u001b[0;32mfor\u001b[0m \u001b[0mtemp_arg\u001b[0m \u001b[0;32min\u001b[0m \u001b[0mtemp_args\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/usr/local/Cellar/apache-spark/2.0.1/libexec/python/pyspark/sql/utils.py\u001b[0m in \u001b[0;36mdeco\u001b[0;34m(*a, **kw)\u001b[0m\n\u001b[1;32m     61\u001b[0m     \u001b[0;32mdef\u001b[0m \u001b[0mdeco\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     62\u001b[0m         \u001b[0;32mtry\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m---> 63\u001b[0;31m             \u001b[0;32mreturn\u001b[0m \u001b[0mf\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m*\u001b[0m\u001b[0ma\u001b[0m\u001b[0;34m,\u001b[0m \u001b[0;34m**\u001b[0m\u001b[0mkw\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0m\u001b[1;32m     64\u001b[0m         \u001b[0;32mexcept\u001b[0m \u001b[0mpy4j\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mprotocol\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mPy4JJavaError\u001b[0m \u001b[0;32mas\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m     65\u001b[0m             \u001b[0ms\u001b[0m \u001b[0;34m=\u001b[0m \u001b[0me\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mjava_exception\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0mtoString\u001b[0m\u001b[0;34m(\u001b[0m\u001b[0;34m)\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n",
      "\u001b[0;32m/Users/marci/anaconda/lib/python3.5/site-packages/py4j/protocol.py\u001b[0m in \u001b[0;36mget_return_value\u001b[0;34m(answer, gateway_client, target_id, name)\u001b[0m\n\u001b[1;32m    317\u001b[0m                 raise Py4JJavaError(\n\u001b[1;32m    318\u001b[0m                     \u001b[0;34m\"An error occurred while calling {0}{1}{2}.\\n\"\u001b[0m\u001b[0;34m.\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[0;32m--> 319\u001b[0;31m                     format(target_id, \".\", name), value)\n\u001b[0m\u001b[1;32m    320\u001b[0m             \u001b[0;32melse\u001b[0m\u001b[0;34m:\u001b[0m\u001b[0;34m\u001b[0m\u001b[0m\n\u001b[1;32m    321\u001b[0m                 raise Py4JError(\n",
      "\u001b[0;31mPy4JJavaError\u001b[0m: An error occurred while calling o49.fit.\n: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 4, localhost): java.lang.NullPointerException: Value at index 1 is null\n\tat org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:466)\n\tat org.apache.spark.sql.Row$class.getInt(Row.scala:217)\n\tat org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:192)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:458)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:457)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:409)\n\tat scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)\n\tat scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\tat java.lang.Thread.run(Thread.java:745)\n\nDriver stacktrace:\n\tat org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)\n\tat scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)\n\tat scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)\n\tat scala.Option.foreach(Option.scala:257)\n\tat org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)\n\tat org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)\n\tat org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)\n\tat org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)\n\tat org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)\n\tat org.apache.spark.rdd.RDD.count(RDD.scala:1134)\n\tat org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:694)\n\tat org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:464)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)\n\tat sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)\n\tat sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)\n\tat java.lang.reflect.Method.invoke(Method.java:498)\n\tat py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)\n\tat py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\n\tat py4j.Gateway.invoke(Gateway.java:280)\n\tat py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)\n\tat py4j.commands.CallCommand.execute(CallCommand.java:79)\n\tat py4j.GatewayConnection.run(GatewayConnection.java:214)\n\tat java.lang.Thread.run(Thread.java:745)\nCaused by: java.lang.NullPointerException: Value at index 1 is null\n\tat org.apache.spark.sql.Row$class.getAnyValAs(Row.scala:466)\n\tat org.apache.spark.sql.Row$class.getInt(Row.scala:217)\n\tat org.apache.spark.sql.catalyst.expressions.GenericRow.getInt(rows.scala:192)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:458)\n\tat org.apache.spark.ml.recommendation.ALS$$anonfun$8.apply(ALS.scala:457)\n\tat scala.collection.Iterator$$anon$11.next(Iterator.scala:409)\n\tat scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)\n\tat scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)\n\tat scala.collection.Iterator$JoinIterator.hasNext(Iterator.scala:211)\n\tat org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)\n\tat org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:86)\n\tat org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)\n\tat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)\n\tat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)\n\t... 1 more\n"
     ]
    }
   ],
   "source": [
    "# Build the recommendation model using ALS on the training data\n",
    "als = ALS(maxIter=5, regParam=0.01, userCol=\"userId\", itemCol=\"mealskew\", ratingCol=\"rating\")\n",
    "model = als.fit(training)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": [
    "# Evaluate the model by computing the RMSE on the test data\n",
    "predictions = model.transform(test)\n",
    "\n",
    "predictions.show()\n",
    "\n",
    "evaluator = RegressionEvaluator(metricName=\"rmse\", labelCol=\"rating\",predictionCol=\"prediction\")\n",
    "rmse = evaluator.evaluate(predictions)\n",
    "print(\"Root-mean-square error = \" + str(rmse))"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python [conda root]",
   "language": "python",
   "name": "conda-root-py"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 0
}
